iT邦幫忙

2025 iThome 鐵人賽

DAY 7
2

https://ithelp.ithome.com.tw/upload/images/20250826/2012475895CmxLv0Y3.jpg
我們講到 Speed Layer 就像一隊外送員,專門把 Kafka 新鮮出爐的訂單火速送到資料庫。但是,衝得快不代表不會翻車。今天我們來看第三種真實會發生的「外送災難」:訂單堆爆。

外送員的終極噩夢:包裹堆爆事件

想像一下,你是物流平台的調度員。剛開始一切都很順利:

  • 倉庫(Kafka)出貨速度:每秒 100 個包裹
  • 外送員(Stream Processing)取貨速度:每秒 100 個包裹
  • 配送中心(PostgreSQL)登記處理速度:每秒 100 個包裹

看起來很完美,對吧?但突然間…

雙十一購物節!

  • 倉庫出貨速度:每秒 300 個包裹
  • 外送員取貨速度:每秒 300 個包裹
  • 配送中心登記處理速度:每秒 50 個包裹(系統開始卡住)

你的外送員開始在配送中心門口大排長龍,包裹越堆越多,記憶體開始爆炸,系統即將崩潰…

真實世界的「包裹堆爆」災難

這不是虛構的故事,這是每個流處理工程師都會遇到的經典問題:背壓(Backpressure)

當你的 PostgreSQL 因為各種原因變慢時:

  • 網路延遲增加
  • 資料庫鎖競爭
  • 硬碟 I/O 瓶頸
  • 連接池耗盡

你的流處理系統就會開始「消化不良」,最終可能發生任何問題而掛掉。

常見反應

很多團隊第一反應是:
「開機器啊!加 CPU、加 RAM、加 DB 寫入節點!」

雖然短期有效,但你永遠追不上流量高峰,因為促銷活動可能瞬間多 100 倍訂單,無限加錢不現實。

解決方案:給外送員裝個「智能煞車器」

今天我們要為我們的 Simple Streaming 框架實作一個背壓機制。核心概念很簡單:
當下游處理不過來時,讓上游聰明地「踩煞車」

重要提醒
本文所有程式碼皆為教學與概念演示用的 Pseudo Code,可以幫助你理解並解決實際場景的問題,但這裡的程式碼不能直接使用,主要目的是用來講解 Stream Processing 的設計思路與核心概念。閱讀時建議把重點放在理解整體架構和設計邏輯上,程式碼細節部分可以輕鬆看過就好。

第一步:設計我們的「緊急信號」

class StreamingOverloadException(Exception):
    """
    當系統過載時拋出的異常
    """
    def __init__(self, message: str, pause_seconds: float = 3.0):
        super().__init__(message)
        self.pause_seconds = pause_seconds  # 建議休息時間

第二步:讓 PostgreSQL Sink 學會「喊累」

我們的 PostgreSQL Sink 需要變聰明,學會監控自己的處理能力。讓我們一步步解析背壓機制的關鍵設計:

背壓檢測流程圖解

我們的 PostgreSQL Sink 需要學會監控自己的健康狀態。讓我們看看它是如何工作的:

Normal Processing Flow (正常處理流程):

Step 1: Data Arrives
┌─────────────────┐
│   New Message   │ ──► write() method
└─────────────────┘

Step 2: Buffer Management
┌─────────────────┐    YES   ┌──────────────────────┐
│ Buffer >= 100?  │ ────────►│  _flush_and_clear()  │
└─────────────────┘          └──────────────────────┘
         │ NO                            │
         ▼                               ▼
┌─────────────────┐          ┌──────────────────────┐
│  Add to Buffer  │          │    Start Timer       │
└─────────────────┘          └──────────────────────┘

Step 3: Performance Monitoring
┌──────────────────────┐     ┌──────────────────────┐
│   Execute Batch      │────►│   Calculate Duration │
│   Insert to DB       │     │   (end_time - start) │
└──────────────────────┘     └──────────────────────┘
                                        │
                                        ▼
                             ┌──────────────────────┐
                             │  _check_overload()   │
                             │  Performance Judge   │
                             └──────────────────────┘

設計思路說明

Step 1 - 數據入口:每個新訊息都通過 write() 方法進入系統。

def write(self, message: Dict[str, Any]):
    """將 message 加入 buffer,達到 batch_size 時批次寫入"""
    # 從 message 取得實際資料(延續 Day 6 設計)
    data = message.get('value', {})
    if not data:
        return
    
    # 加入 buffer
    self._buffer.append(data)
    
    # 達到批次大小,批量寫入
    if len(self._buffer) >= self.batch_size:
        self._flush_and_clear()  # 觸發 Step 2

Step 2 - 智能緩衝:當 buffer 達到 100 筆時,觸發 _flush_and_clear()。這裡是背壓檢測的關鍵入口。

Step 3 - 關鍵創新:在 _flush_and_clear() 中添加計時器,讓系統感知自己的處理速度:

def _flush_and_clear(self):
    """批次寫入資料,順便當個「身體健康監測器」"""
    if not self._buffer:
        return
        
    # 關鍵:開始計時(對應圖中 "Start Timer")
    flush_start_time = time.time()
    buffer_size = len(self._buffer)
    
    try:
        # 執行批次寫入到 PostgreSQL(對應圖中 "Execute Batch Insert")
        # ... 執行批量插入邏輯
        
        # 計算處理時間(對應圖中 "Calculate Duration")
        flush_duration = time.time() - flush_start_time
        
        # 關鍵:檢查是否過載(對應圖中 "_check_overload()")
        self._check_overload(flush_duration, buffer_size)
        
        # 清空 buffer
        self._buffer.clear()
        
    except Exception as e:
        logger.error(f"批次寫入失敗: {e}")
        raise

背壓觸發機制

關鍵在於如何判斷系統「真的累了」,而不是偶爾的性能波動。我們採用了「三振出局」策略:

Overload Detection Logic (過載檢測邏輯):

┌──────────────────────┐
│ Duration > 2.0s ?    │
└──────────────────────┘
           │
    ┌─────────────────┐
    │ YES       │ NO  │
    ▼                 ▼
┌─────────────┐  ┌─────────────┐
│ slow_count++│  │ slow_count--│
└─────────────┘  └─────────────┘
    │                │
    ▼                ▼
┌─────────────┐  ┌─────────────┐
│Count >= 3 ? │  │ Keep Normal │
└─────────────┘  │ Processing  │
    │            └─────────────┘
    ▼ YES
┌─────────────────────────────────────┐
│       BACKPRESSURE TRIGGER          │
│                                     │
│       pause_time = 5.0s             │
│  throw StreamingOverloadException   │
└─────────────────────────────────────┘

設計思路說明

閾值設定(2.0秒):為什麼選擇 2 秒?在正常情況下,批次寫入 100 筆記錄應該在幾百毫秒內完成。2 秒已經是明顯的性能異常信號。這個閾值可以根據實際硬體和資料庫配置調整。

計數器機制(slow_count):單次慢速處理可能只是偶發情況(網路抖動、其他查詢競爭等)。我們使用計數器來區分「偶發慢速」和「持續過載」:

  • 慢速處理:計數器 +1
  • 正常處理:計數器 -1(但最小為 0)

三振出局策略:連續 3 次慢速處理才觸發背壓。這個設計避免了過於敏感的誤報,同時能快速響應真正的系統過載。

固定暫停時間:5 秒的暫停時間足夠讓大部分暫時性問題(如短期鎖競爭、記憶體 GC 等)自行恢復,同時不會讓系統停頓太久。

首先,初始化背壓參數

def __init__(self, batch_size: int = 100):
    ...
    
    # 新增:背壓檢測參數(對應上方圖解中的各個閾值)
    self._slow_flush_count = 0        # 計數器:追蹤連續慢速處理次數
    self._max_slow_flush = 3          # 觸發閾值:連續 3 次就觸發背壓
    self._slow_flush_threshold = 2.0  # 時間閾值:超過 2 秒算慢速處理

然後,對應的檢測邏輯實現

def _check_overload(self, flush_duration: float):
    """檢測是否過載 - 就像外送員檢查自己是不是太累了"""
    
    # 步驟 1:檢查處理時間是否超過閾值(對應圖中 "Duration > 2.0s ?")
    if flush_duration > self._slow_flush_threshold:  # 2.0 秒閾值
        
        # 步驟 2:增加慢速計數(對應圖中 "slow_count++")
        self._slow_flush_count += 1
        logger.warning(f"處理變慢了: {flush_duration:.2f}s (第 {self._slow_flush_count} 次)")
        
        # 步驟 3:檢查是否達到觸發條件(對應圖中 "Count >= 3 ?")
        if self._slow_flush_count >= self._max_slow_flush:  # 連續 3 次
            
            # 步驟 4:觸發背壓機制(對應圖中 "BACKPRESSURE TRIGGER")
            error_msg = f"PostgreSQL 處理過載:連續 {self._slow_flush_count} 次慢速處理"
            pause_time = 5.0  # 固定休息 5 秒,簡單易懂
            
            self._slow_flush_count = 0  # 重置計數器
            
            # 拋出過載異常,觸發背壓機制!
            raise StreamingOverloadException(error_msg, pause_seconds=pause_time)
    else:
        # 處理速度恢復正常,給點鼓勵(對應圖中 "slow_count--")
        self._slow_flush_count = max(0, self._slow_flush_count - 1)

第三步:讓 SimpleStreamingEngine 學會「緊急煞車」

現在我們需要讓 SimpleStreamingEngine 學會處理這個「求救信號」。讓我們分析應用層的背壓響應機制:

應用層背壓響應流程

SimpleStreamingEngine 需要在訊息處理的入口就進行背壓檢查,確保不會遺失任何數據:

Step 1: Message Processing Entry Point
┌─────────────────────────────────────┐
│         New Message Arrives         │
└─────────────────────────────────────┘
                    │
                    ▼
┌─────────────────────────────────────┐
│       message_handler()             │
│                                     │
│    First Check: _should_pause()?    │
└─────────────────────────────────────┘
                    │
        ┌──────────────────────┐
        │ YES          │ NO    │
        ▼                      ▼
┌─────────────────┐    ┌─────────────────┐
│  Pause Kafka    │    │ Process Message │
│  Consumer       │    │ Normally        │
│                 │    │                 │
└─────────────────┘    └─────────────────┘
                                │
                                ▼
                    ┌─────────────────────┐
                    │ Exception Handler   │
                    └─────────────────────┘

Step 1 - 訊息處理入口:每個來自 Kafka 的訊息都會通過 _create_message_handler() 創建的處理器:

def _create_message_handler(self, source):
    """創建訊息處理器 - 加上智能煞車功能"""
    def message_handler(message):
        # 第一道防線:檢查系統狀態(對應圖中 "_should_pause()?")
        if self._should_pause():
            logger.debug("因背壓暫停,暫停 Kafka 消費")
            self._pause_kafka_consumer(source)  # 對應圖中 "Pause Kafka Consumer"
            return
        
        # 正常處理流程(對應圖中 "Process Message Normally")
        try:
            df = self._source_dataframe_map[source]
            df.process_message(message)
        except StreamingOverloadException as e:
            # 異常處理:收到下游求救信號(對應圖中 "Exception Handler")
            self._handle_overload(e, source)
        except Exception as e:
            logger.error(f"其他錯誤: {e}")
    
    return message_handler

關鍵設計思路

  • 優先檢查:在處理任何訊息前,先檢查系統是否需要暫停
  • 安全暫停:不丟棄訊息,而是暫停 Kafka Consumer,讓訊息留在隊列中
  • 異常捕捉:捕捉來自 PostgreSQL Sink 的過載異常

緊急煞車觸發與恢復機制

當 PostgreSQL Sink 拋出 StreamingOverloadException 時,整個 SimpleStreamingEngine 進入緊急狀態:

Normal Processing State:
┌─────────────────────────────────────┐
│          _paused = False            │
│         Normal Processing           │
└─────────────────────────────────────┘
                    │
                    ▼ StreamingOverloadException
┌─────────────────────────────────────┐
│         EMERGENCY BRAKE             │
│                                     │
│  _paused = True                     │
│  _pause_until = now + pause_seconds │
│  _overload_count++                  │
└─────────────────────────────────────┘
                    │
                    ▼
┌─────────────────────────────────────┐
│          PAUSED STATE               │
│                                     │
│   Kafka Consumer is paused          │
│   Messages remain in Kafka queue    │
│   System gives DB time to recover   │
└─────────────────────────────────────┘
                    │
                    ▼ Time expires
┌─────────────────────────────────────┐
│         AUTO RECOVERY               │
│                                     │
│  current_time >= _pause_until ?     │
│  YES: _paused = False               │
│  Resume normal processing           │
└─────────────────────────────────────┘

緊急煞車處理:當接收到過載異常時(對應圖中 "EMERGENCY BRAKE"):

def _handle_overload(self, error: StreamingOverloadException, source):
    """處理過載 - 緊急煞車程序"""
    # 統計過載次數
    self._overload_count += 1
    
    # 設定系統狀態(對應圖中狀態變更)
    self._paused = True
    self._pause_until = time.time() + error.pause_seconds  # 計算暫停到什麼時候
    
    logger.warning(
        f"系統過載 (第 {self._overload_count} 次)! "
        f"暫停 {error.pause_seconds:.1f} 秒讓系統喘口氣..."
    )
    
    # 立即暫停 Kafka Consumer(對應圖中 "Kafka Consumer is paused")
    self._pause_kafka_consumer(source)

狀態檢查與自動恢復:每次處理訊息前都會檢查(對應圖中 "AUTO RECOVERY"):

def _should_pause(self) -> bool:
    """檢查是否該暫停 - 就像看紅綠燈"""
    current_time = time.time()
    
    # 檢查當前狀態
    if self._paused and current_time < self._pause_until:
        return True  # 還在暫停期間(對應圖中 "PAUSED STATE")
        
    elif self._paused and current_time >= self._pause_until:
        # 時間到了,自動恢復(對應圖中 "AUTO RECOVERY")
        self._paused = False
        logger.info("暫停結束,恢復處理訊息")
        # 重要:恢復所有 Kafka Consumer
        self._resume_all_kafka_consumers()
    
    return False  # 正常處理狀態

架構回顧

經過幾天的學習,我們的 SimpleStreamingEngine 已經從基礎的資料流處理進化為更穩定的流處理系統:

Day 7: SimpleStreamingEngine with Backpressure (帶背壓機制的流處理架構)

┌─────────────────┐    ┌──────────────────┐    ┌──────────────────┐
│   KafkaSource   │───►│DataFrame.filter()│───►│PostgreSQLSink    │
│                 │    │                  │    │                  │
│ • Topic consume │    │ • Lambda filter  │    │ • Batch buffer   │
│ • Consumer pause│    │ • Data transform │    │ • Timer trigger  │
│ • Auto resume   │    │                  │    │ • Bulk insert    │
│                 │    │                  │    │ • Overload detect│
│                 │    │                  │    │ • Exception throw│
└─────────────────┘    └──────────────────┘    └──────────────────┘
         ▲                                               │
         │         ┌─────────────────────────────────────┘
         │         │
         │         ▼
┌─────────────────────────────────────────────────────────────┐
│              SimpleStreamingEngine                          │
│                                                             │
│  • Message handler with backpressure check                  │
│  • _should_pause() status management                        │
│  • _handle_overload() emergency brake                       │
│  • Kafka Consumer pause/resume control                      │
│  • Auto recovery after timeout                              │
└─────────────────────────────────────────────────────────────┘

從單純的「訊息進→處理→出」,進化為具備智能流量控制的完整系統:

  • Day 5: 建立基礎流處理能力
  • Day 6: 優化吞吐量,提升處理效率
  • Day 7: 保障系統穩定,防止過載崩潰

總結

我們學會了什麼

背壓機制就像是給你的流處理系統請了一個聰明的調度員。他會讓整個系統跑得更穩、更持久。

核心設計原則

記住,在高吞吐量系統的世界裡,優雅地處理過載比暴力地提升效能更重要。當你的 PostgreSQL 開始「喊累」時,讓系統聽話地慢下來,往往比強行「鞭策」它更明智。

關鍵技術要點

  • 三振出局策略:避免誤報,確保真正過載才觸發
  • 固定暫停時間:5 秒簡單易懂,適合教學和理解
  • Kafka Consumer 暫停:保證數據不遺失的核心設計
  • 自動恢復機制:系統自癒能力,無需人工干預

生產環境的背壓機制

在真實的生產環境中,背壓機制遠比我們今天實作的版本更加複雜和精細:

業界的背壓實現

Apache Flink 成熟的流處理框架都內建了背壓機制:

  • 自動檢測:系統自動監控處理速度和資源使用情況
  • 動態調整:根據負載情況自動調整處理速度
  • 分散式支援:在多節點環境中協調背壓狀態

教學重點回顧

今天我們透過簡化的實作理解了背壓機制的核心概念

  • 邏輯清晰,容易理解設計思路
  • 包含核心要素:檢測、觸發、暫停、恢復
  • 保證數據安全,不會遺失訊息

就像我們一開始說的物流平台一樣 - 最好的服務不是讓外送員累死,而是讓每個包裹都能穩定送達,讓客戶明天還願意繼續下單。

Day 8 預告

今天我們學會了讓系統在過載時「踩煞車」,但還有一個關鍵問題沒解決:如何精準追蹤處理進度?

在流處理系統中,當系統重啟或發生故障時,我們需要知道:已經處理到 Kafka 的哪個 offset?哪些數據已經成功寫入資料庫?如何避免重複處理或數據遺失?

明天我們來聊聊 手動控制 commit - 也就是處理進度管理的核心技術。我們會透過 checkpoint 機制實現:

  • 精準記錄 Kafka offset 和處理狀態
  • 故障重啟時從正確位置繼續處理
  • 避免重複處理和數據不一致問題
  • 靈活控制 commit 時機,確保數據安全

上一篇
【知其然,更知其所以然】Day 6:批量寫入的威力 - 讓 Kafka Consumer 暢行無阻
下一篇
【知其然,更知其所以然】Day 8: Checkpoint 與進度保存
系列文
「知其然,更知其所以然:什麼是 Real-time (Streaming) Pipeline?從造輪子到 Flink 與 RisingWave」15
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言